RDD 的操作分类
transfromation (转换)
transfromation 是 lazy 的,只有遇到 action 的时候才会真正的去执行,触发计算。
map
将 func 函数作用到数据集的每一个元素上。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31from pyspark import SparkConf, SparkContext
if __name__ == "__main__":
conf = SparkConf().setMaster("local[2]").setAppName("test")
sc = SparkContext(conf=conf)
def my_map():
"""
将每个元素 * 2
:return:
"""
data = [1, 2, 3, 4, 5]
rdd1 = sc.parallelize(data)
rdd2 = rdd1.map(lambda x: x * 2)
print(rdd2.collect())
def my_app2():
"""
将每个单词变成 (x, 1)的格式
:return:
"""
data = ["Unable", "to", "native-hadoop", "library", "for", "your", "platform"]
rdd3 = sc.parallelize(data)
rdd4 = rdd3.map(lambda x: (x, 1))
print(rdd4.collect())
my_map()
my_app2()
sc.stop()
Filter
选出所有 func 返回值为 true 的元素。1
2
3
4
5
6
7
8
9def my_filter():
"""
筛选出大于3的元素
:return:
"""
data = [1, 2, 3, 4, 5]
rdd5 = sc.parallelize(data)
rdd6 = rdd5.filter(lambda x: x > 3)
print(rdd6.collect())
flatMap
1 | def my_faltmap(): |
groupByKey
把相同 key 的数据分发到一起
1 | def my_groupby(): |
reduceByKey
把相同 key 的数据分发到一起进行相应的计算1
2
3
4
5
6
7
8
9
10
11
12
13def my_reducebykey():
"""
:return:
"""
data = ["hello spark", "hello world", "hello world"]
rdd9 = sc.parallelize(data)
rdd10 = rdd9.flatMap(lambda line: line.split(" ")).map(lambda x: (x, 1))
print(rdd10.collect())
# [('hello', 1), ('spark', 1), ('hello', 1), ('world', 1), ('hello', 1), ('world', 1)]
reduce_by_key_rdd = rdd10.reduceByKey(lambda a, b: a + b)
print(reduce_by_key_rdd.collect())
# [('world', 2), ('hello', 3), ('spark', 1)]
sortByKey
sortByKey 的作用是根据 key 进行排序,默认是根据 key 进行升序排序。
例如:将上面的结果 [(‘world’, 2), (‘hello’, 3), (‘spark’, 1)] 按照后面的数字进行降序排序
1 | def my_sortbykey(): |
union
将两个 RDD 进行合并
1 | def my_union(): |
distinct
去重1
2
3
4
5
6def my_distinct():
rdd1 = sc.parallelize([1, 2, 3, 4, 6])
rdd2 = sc.parallelize([4, 5, 6])
rdd3 = rdd1.union(rdd2)
print(rdd3.distinct().collect())
# [4, 1, 5, 2, 6, 3]
join
默认内连接,可以进行 leftOuterJoin,rightOuterJoin,fullOuterJoin1
2
3
4
5
6
7
8
9
10
11
12
13
14def my_join():
rdd1 = sc.parallelize([("A", "a1"), ("C", "c1"), ("D", "d1"), ("F", "f1"), ("F", "f2")])
rdd2 = sc.parallelize([("A", "a2"), ("C", "c2"), ("C", "c3"), ("E", "e1")])
print(rdd1.join(rdd2).collect())
# [('A', ('a1', 'a2')), ('C', ('c1', 'c2')), ('C', ('c1', 'c3'))] -- 内连接结果
print(rdd1.leftOuterJoin(rdd2).collect())
# [('A', ('a1', 'a2')), ('F', ('f1', None)), ('F', ('f2', None)), ('C', ('c1', 'c2')), ('C', ('c1', 'c3')), ('D', ('d1', None))] -- 左外连接结果
print(rdd1.rightOuterJoin(rdd2).collect())
# [('A', ('a1', 'a2')), ('C', ('c1', 'c2')), ('C', ('c1', 'c3')), ('E', (None, 'e1'))] -- 右外连接
print(rdd1.fullOuterJoin(rdd2).collect())
# [('A', ('a1', 'a2')), ('F', ('f1', None)), ('F', ('f2', None)), ('C', ('c1', 'c2')), ('C', ('c1', 'c3')), ('D', ('d1', None)), ('E', (None, 'e1'))] -- 全连接
action(动作)
将计算的结果进行返回。
collect, count, take, reduce
1 | def my_action(): |